In [1]:
:load KrapshDisplays KrapshDagDisplay

In [2]:
:extension DeriveGeneric
:extension FlexibleContexts
:extension GeneralizedNewtypeDeriving
:extension FlexibleInstances
:extension MultiParamTypeClasses

In [3]:
import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Column
import Spark.Core.ColumnFunctions
import Spark.Core.Functions
import Spark.Core.Row
import Spark.Core.Types
import Spark.Core.Try

import qualified Data.Vector as V
import qualified Data.Text as T
import GHC.Generics

Typed and untyped data with Krapsh

It is common to start data exploration with untyped data, to see how things are working together, and to refine this approach into more and more types as a pipeline moves into production.

Krapsh offers a typed and an untyped API, reflecting different tradeoffs:

  • the untyped API is easier to use, but it most mistakes will only be exposed to the user before running the computation. Do not worry though, you can still force runtime failures.
  • the typed API requires a bit more machinery and may not have error messages that are as convenient. However, it offers stronger guarantees that the operations are legitimate. Also, it can be enforced across a large codebase without having to run some code first.

We are going to explore these tradeoffs on a small task of filtering data.

You are given a dataset about the nearby forest that details various trees being observed.

  • tree id
  • tree height
  • tree width

And we are going to compute various statistics about these trees.


In [4]:
rawData = [(1, 3, 2)] :: [(Int, Int, Int)]

For the sake of this example, we manually build up the content in a form that is digestible by Krapsh. In practice, though, this data would come from one of Spark's many input sources.


In [5]:
let fun (id', height, width) = RowArray $ V.fromList [IntElement id', IntElement height, IntElement width]
dataCells = fun <$> rawData
dataCells


[RowArray (fromList [IntElement 1,IntElement 3,IntElement 2])]

Dynamic operations with Dataframes

The first way to build a dataframe is to combine a datatype with a list of cells. We can build the datatype by hand, and then call the dataframe function:


In [6]:
-- The API is not stable yet, importing some internals
import Spark.Core.Internal.TypesFunctions
dt = structType [
        structField (T.pack "treeId") intType,
        structField (T.pack "treeWidth") intType,
        structField (T.pack "treeHeight") intType]

treesDF = dataframe dt dataCells
treesDF


Right constant_d5e4f2@org.spark.Constant{{treeId:int treeWidth:int treeHeight:int}}

As one can see from above, this operation succeeded, and lets us manipulate the data. We can extract columns of data, combine these columns, etc.


In [7]:
idCol = treesDF // "treeId"
idCol


Right treeId{int}->constant_d5e4f2

In [8]:
widthCol = treesDF//"treeWidth"
doubleWidthCol = (widthCol + widthCol) @@ "doubleWidth"

At the end of the day, all the resulting columns can be packed again in a new dataframe:


In [9]:
outputDF = pack' [idCol, doubleWidthCol]
outputDF


Right select_62f9d1@org.spark.Select{{treeId:int doubleWidth:int}}

Of course, this API does not prevent us from doing weird operations:


In [10]:
-- What does that mean?
weirdCol = idCol + widthCol
weirdCol2 = idCol + 1

And Haskell will not help us if the schema of the dataframe changes:


In [11]:
treesDF // "missingColumn"


Left (Error {ePath = NPath(), eMessage = "unsafeStaticProjection: Cannot find the field missingColumn in type {treeId:int treeWidth:int treeHeight:int}"})

Simple typing operations with Datasets

We can enforce some level of typing by making the structure of the data available to Krapsh. Here is how to assign a data structure to some data represented by Krapsh. We start by some simple representation that uses raw types:


In [12]:
data Tree = Tree {
  treeId :: Int,
  treeWidth :: Int,
  treeHeight :: Int } deriving (Generic, Show) -- You need these two classes at least

-- Automotically builds some converters between Spark datatypes and the
-- Haskell representation.
instance SQLTypeable Tree
-- Automatically infers some converters between the Spark data formats
-- and the Haskell in-memory representation.
instance ToSQL Tree

-- Theses accessors must be written by hand for now, but they can be
-- inferred in the future using TemplateHaskell.
treeId' :: StaticColProjection Tree Int
treeId' = unsafeStaticProjection buildType "treeId"
treeWidth' :: StaticColProjection Tree Int
treeWidth' = unsafeStaticProjection buildType "treeWidth"
treeHeight' :: StaticColProjection Tree Int
treeHeight' = unsafeStaticProjection buildType "treeHeight"
instance TupleEquivalence Tree (Int, Int, Int) where
  tupleFieldNames = NameTuple ["treeId", "treeWidth", "treeHeight"]

We can now take a dataframe and attempt to cast it to a (typed) dataset. Since this operation can fail, it is wrapped with a Try.


In [13]:
tTreesDS = asDS treesDF :: Try (Dataset Tree)
tTreesDS


Right constant_d5e4f2@org.spark.Constant{{treeId:int treeWidth:int treeHeight:int}}

In [14]:
asDS outputDF :: Try (Dataset Tree)


Left (Error {ePath = NPath(), eMessage = "Casting error: dataframe has type {treeId:int doubleWidth:int} incompatible with type {treeId:int treeWidth:int treeHeight:int}"})

Since we know that this is going to work and since we are doing exploratory analysis, we are going to unwrap the Try and look at the dataset directly. That code will throw an exception if the types are not compatible:


In [15]:
-- To import `forceRight`. It will be available in the future
import Spark.Core.Internal.Utilities
treesDS = forceRight (asDS treesDF) :: Dataset Tree
treesDS


constant_d5e4f2@org.spark.Constant{{treeId:int treeWidth:int treeHeight:int}}

All the operations on the dataframes can now be checked by the compiler:


In [16]:
col1 = treesDS // treeId'
:t col1
col1


col1 :: Column Tree Int
treeId{int}->constant_d5e4f2

We can still do some dynamic matching if we prefer, but we get a dynamic column instead.


In [17]:
col1' = treesDS // "treeId"
:t col1'
col1'


col1' :: DynColumn
Right treeId{int}->constant_d5e4f2

After manipulating columns, all the data can be packed as a tuple, or as some other types. The following operations are fully type-checked. Try to change the types to see what happens:


In [18]:
outputDS = pack (col1, treesDS//treeWidth') :: Dataset (Int, Int)
-- Or we can get our trees back
outputDS2 = pack (treesDS//treeId', treesDS//treeWidth', treesDS//treeHeight') :: Dataset Tree

This still lets do some bogus operations, because we use some primitive types to represent the data:


In [19]:
-- Some curious operation
curious = (treesDS//treeWidth') + (treesDS//treeId')
:t curious
curious


curious :: Column Tree Int
sum(treeWidth, treeId){int}->constant_d5e4f2

Domain-specific static typing

Or course we do not want to mix the different data types together, as we would do with regular Haskell code. Using newtype instances, we can tell Krapsh to use different types in Haskell, while still using the same datatype representation in Spark.


In [20]:
-- We are not allowing arithmetic operations on the ids anymore
newtype MyId = MyId Int deriving (Generic, Show)
instance SQLTypeable MyId
instance ToSQL MyId

-- We allow the new Length type to do some operations
newtype Length = Length Int deriving (Generic, Num, Show)
instance SQLTypeable Length
instance ToSQL Length

Let us define our new 'safer' tree structure. Because of Haskell's limitation with records, and because we have all the structures in the same notebooks, we have to pick a differnt name for the variables. In practice, these structures would not get mixed up.


In [21]:
data STree = STree {
  sTreeId :: MyId,
  sTreeWidth :: Length,
  sTreeHeight :: Length } deriving (Generic, Show)

instance SQLTypeable STree
instance ToSQL STree

-- Theses accessors must be written by hand for now, but they can be
-- inferred in the future using TemplateHaskell.
sTreeId' :: StaticColProjection STree MyId
sTreeId' = unsafeStaticProjection buildType "sTreeId"
sTreeWidth' :: StaticColProjection STree Length
sTreeWidth' = unsafeStaticProjection buildType "sTreeWidth"
sTreeHeight' :: StaticColProjection STree Length
sTreeHeight' = unsafeStaticProjection buildType "sTreeHeight"
instance TupleEquivalence STree (MyId, Length, Length) where
  tupleFieldNames = NameTuple ["sTreeId", "sTreeWidth", "sTreeHeight"]

Because of the name change, we cannot cast directly our previous dataframe to that dataset: the names of the fields do not match.

NOTE: that behaviour may be changed in future by just focusing on the types and dropping the name checks.


In [22]:
forceRight (asDS treesDF) :: Dataset STree


constant_d5e4f2@org.spark.Constant{{sTreeId:int sTreeWidth:int sTreeHeight:int}}

We are going to do some gymnastics with the columns. There are two choices: either we build a dataframe first and then type-check it, or we type-check first each of the columns of the dataframe, and then combine the checked columns in a safe manner.

Here is the first option:


In [23]:
-- We can build a structure first and convert it to a dataframe:
str = struct' [ (treesDF//"treeId") @@ "sTreeId",
                (treesDF//"treeWidth") @@ "sTreeWidth",
                (treesDF//"treeHeight") @@ "sTreeHeight"]
treesDF2 = pack' str
treesDS2 = forceRight (asDS treesDF2) :: Dataset STree
:t treesDS2
treesDS2


treesDS2 :: Dataset STree
select_2f25e5@org.spark.Select{{sTreeId:int sTreeWidth:int sTreeHeight:int}}

And here is using typed columns. The do...return block wraps all the possible failurs when extracting the types columns.


In [24]:
tTreesDS2 = do
  idCol <- castCol' (buildType::SQLType MyId) (treesDF//"treeId")
  widthCol <- castCol' (buildType::SQLType Length) (treesDF//"treeWidth")
  heightCol <- castCol' (buildType::SQLType Length) (treesDF//"treeWidth")
  -- This operation is type-safe
  let s = pack (idCol, widthCol, heightCol) :: Dataset STree
  return s
treesDS2 = forceRight tTreesDS2
:t treesDS2
treesDS2


treesDS2 :: Dataset STree
select_d40cf1@org.spark.Select{{sTreeId:int sTreeWidth:int sTreeHeight:int}}

Now all the data can be manipulated in a type-safe manner. Under the hood, all these types will be unwrapped to Spark's primitive types.


In [25]:
idCol = treesDS2 // sTreeId'
:t idCol
idCol


idCol :: Column STree MyId
sTreeId{int}->select_d40cf1

In [26]:
-- This will not work anymore:
idCol + idCol


No instance for (Num MyId) arising from a use of `+'
In the expression: idCol + idCol
In an equation for `it': it = idCol + idCol

In [27]:
-- But this will still work:
widthCol = treesDS2//sTreeWidth'
heightCol = treesDS2//sTreeHeight'
volumeCol = (widthCol + heightCol) @@ "volume"
:t volumeCol
volumeCol


volumeCol :: Column STree Length
volume{int}->select_d40cf1

Potentially illegal casting operations will not work:


In [28]:
pack (idCol, volumeCol) :: Dataset (Int, Int)


No instance for (krapsh-0.1.9.0:Spark.Core.Internal.FunctionsInternals.StaticColPackable2 STree (krapsh-0.1.9.0:Spark.Core.Internal.ColumnStructures.ColumnData STree Length) Int)
arising from a use of `pack'
In the expression: pack (idCol, volumeCol) :: Dataset (Int, Int)
In an equation for `it': it = pack (idCol, volumeCol) :: Dataset (Int, Int)

In [29]:
pack (idCol, volumeCol) :: Dataset (MyId, Length)


select_cc118d@org.spark.Select{{_1:int _2:int}}

And of course, the final result can always be converted back to a dataframe if it is more convenient:


In [30]:
pack' [untypedCol idCol, untypedCol volumeCol]


Right select_bd4659@org.spark.Select{{sTreeId:int volume:int}}

To conclude, Krapsh allows you to use Haskell's type checking as an opt-in compile-time check. You can still mix and match both styles if more convenient.